Module 4 - Programming Assignment

This is the notebook for the Module 4 Programming Assignment.

Here are a few tips for using the iPython HTML notebook:

  1. You can use tab . Try le<<tab> and see the available functions.
  2. You can change the type of cell by picking "Code" or "Markdown" from the menu at the left.
  3. If you keep typing in a Markdown text area, you will eventually get scroll bars. To prevent this, hit return when you come to the end of the window. Only a double return creates a new paragraph.
  4. You can find out more about Markdown text here: http://daringfireball.net/projects/markdown/ (Copy this link and put it in another tab for reference--Don't click it or you'll leave your notebook!).
  5. Every so often, restart the kernel, clear all output and run all code cells so you can be certain that you didn't define something out of order.

You should rename this notebook to be <your JHED id>_PR1.ipynb Do it right now.

Make certain the entire notebook executes before you submit it. (See Hint #5 above)

Change the following variables:


In [1]:
name = "Shehzad Qureshi"
jhed_id = "squresh6"
if name == "Student Name" or jhed_id == "sname1":
    raise Exception( "Change the name and/or JHED ID...preferrably to yours.")

Add whatever additional imports you require here. Stick with the standard libraries and those required by the class. The import gives you access to these functions: http://ipython.org/ipython-doc/stable/api/generated/IPython.core.display.html (Copy this link) Which, among other things, will permit you to display HTML as the result of evaluated code (see HTML() or display_html()).


In [2]:
from IPython.core.display import *
from StringIO import StringIO
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

This problem is very similar to the one in Module 1 that we solved with A* search. The main difference is that we now have stochastic movement (actions don't always result in the desired successor state) so we can't use A* search to solve it. Instead you're going to use Q-Learning (reinforcement learning).

As before, we're going to simplify the problem by working in a grid world. The symbols that form the grid have a special meaning as they specify the type of the terrain and the cost to enter a grid cell with that type of terrain:

token terrain cost . plains 1

  • forest 3 ^ hills 5 ~ swamp 7 x mountains impassible </code>

When you go from a plains node to a forest node it costs 3. When you go from a forest node to a plains node, it costs 1. You can think of the grid as a big graph. Each grid cell (terrain symbol) is a node and there are edges to the north, south, east and west (except at the edges). Unlike A* Search, Reinforcement Learning will find a policy for the entire state space. The goal is (26, 26) with a reward of 100.0.

To avoid global variables, we have a read_world() function that takes a filename and returns the world as list of lists. The same coordinates reversal applies: (x, y) is world[ y][ x].


In [3]:
def read_world( filename):
    with open( filename, 'r') as f:
        world_data = [x for x in f.readlines()]
    f.closed
    world = []
    for line in world_data:
        line = line.strip()
        if line == "": continue
        world.append([x for x in line])
    return world

Next we create a dict of movement costs. Note that we've negated them this time because RL requires negative costs and positive rewards:


In [4]:
# do not reference this as a global variable.
costs = { '.': -1, '*': -3, '^': -5, '~': -7}
costs


Out[4]:
{'*': -3, '.': -1, '^': -5, '~': -7}

and a list of offsets for NEIGHBORS. You'll need to work this into your actions, A, parameter.


In [5]:
NEIGHBORS = [(0,-1), (1,0), (0,1), (-1,0)]

The transition function, T, is 0.70 for the desired direction, and 0.10 each for the other possible directions. That is, if I select "up" then 70% of the time, I go up but 10% of the time I go left, 10% of the time I go right and 10% of the time I go down. If you're at the edge of the map, you simply bounce back to the current state.

You need to implement q_learning() with the following parameters:

  • world: a list of lists of terrain (this is S from S, A, T, gamma, R)
  • costs: a hash of costs by terrain (this is part of R)
  • goal: A Tuple of (x, y) stating the goal state.
  • reward: The reward for achieving the goal state.
  • actions: a List of possible actions, A.
  • gamma: the discount rate
  • alpha: the learning rate

you will return a policy: {(x1, y1): action1, (x2, y2): action2, ...} Remember...a policy is what to do in any/every state. Notice how this is different that A* search which only returns one path from start to goal. For a 2d navigation problem, you can also print out a map of what action to do in every state.

Use a goal that is the lower right hand corner of the world.

There are a lot of details that I have left up to you. Watch and re-watch the lecture on Q-Learning. Ask questions. You need to implement a way to pick initial states for each iteration and you need a way to balance exploration and exploitation while learning. You may have to experiment with different gamma and alpha values.

Remember that you should follow the general style guidelines for this course: well-named, short, focused functions with limited indentation using Markdown documentation that explains their implementation and the AI concepts behind them.

This assignment sometimes wrecks havoc with IPython notebook. Create a small test world (5x6) to work with. Pick an asymmetric world to avoid edge cases.

Reinforcement Learning

Let's start by pre-defining some variables that will come in handy later. We're going to artificially limit the number of episodes and iterations per episode to keep iPython from running amok.


In [6]:
debug = False
iter_max = 10000
episodes = 10000
epsilon = 0.15
omega = 0.01
Ne = 5
Pe = 10

Now we need a way to initialize our Q Matrix. This matrix will be a function of the size of the world and the number of actions per state. We will also set our goal state to the reward value provided.


In [7]:
def initialize_q_matrix(world, actions, goal, reward):
    xlen, ylen, alen = len(world[0]), len(world), len(actions)
    Q = np.zeros((ylen, xlen, alen))
    x, y = goal
    Q[y][x] = reward
    return Q

Next we need a function to pick an initial state. We shall try to start closer to the goal and work our way further and further away.


In [8]:
def initialize_state(world, goal, episode):
    cx, cy = int(goal[0] - (float(episode)/Pe)), int(goal[1] - (float(episode)/Pe))
    if cx < 0: cx = 0
    if cx < 0: cy = 0
    bx, by = len(world[0]), len(world)
    positions = []
    positions.extend([(i,cy) for i in xrange(cx, bx) if world[cy][i] in costs])
    positions.extend([(cx,i) for i in xrange(cy, by) if world[i][cx] in costs])
    if goal in positions: positions.remove(goal) 
    state = positions[np.random.randint(0, len(positions))]
    if debug: print("Initialized state to {0}".format(state))
    return state

We also need a method to pick an action once a state has been selected. This shall be a combination of methods:

  • First we shall attempt to pick actions that have not been picked as much. This is needed to ensure that all states are initially explored equally.
  • Then we shall obtain a random value and compare it with epsilon, defined above. This will be our exploration/exploitation factor. If this number is less than epsilon we shall explore, else we shall exploit, i.e. pick the largest Q value for this state.

In [9]:
def select_action(Q, N, s):
    x, y = s
    n = [i for i,v in enumerate(N[y][x] < Ne) if v]
    if len(n) > 0: action = np.random.choice(n)
    else:
        e = np.random.random()
        if e < epsilon: action = np.random.randint(0, len(Q[y][x]))
        else: action = np.argmax(Q[y][x])
    N[y][x][action] = N[y][x][action] + 1
    if debug: print("  Selecting {0} from {1} for {2}".format(action, Q[y][x], (x,y)))
    return action

Every action needs to be simulated with the transition model given above. The agent has no knowledge of this transition model. We also do bounds checking for each action and every out-of-bounds move will result in staying in the same state but will yield a larger penalty.


In [10]:
def execute_action(world, costs, state, action):
    p = [0.7 if x == action else 0.1 for x in xrange(len(NEIGHBORS))]
    if debug: print("  Attempting to move {0} from {1} with p={2}".format(NEIGHBORS[action],state,p))
    move = NEIGHBORS[np.random.choice(xrange(len(NEIGHBORS)), p=p)]
    bound_x, bound_y = len(world[0]) - 1, len(world) - 1
    x, y = state[0] + move[0], state[1] + move[1]
    if x < 0 or x > bound_x or y < 0 or y > bound_y or world[y][x] not in costs:
        new_state, reward = state, -100
    else:
        new_state = (x,y)
        reward = collect_reward(world, costs, new_state)
    if debug: print("  Executing {0} from {1} to {2}".format(move,state,new_state))
    return new_state, reward

Naturally, every action will result in a reward.


In [11]:
def collect_reward(world, costs, new_state):
    x, y = new_state
    reward = costs[world[y][x]]
    if debug: print("  Collecting reward of {0} from {1}".format(reward,new_state))
    return reward

This method will convert our final Q matrix into an actual policy. The policy is a dictionary of each available state mapped to the action with the highest value.


In [12]:
def get_policy(Q):
    policy = dict()
    y, x, a = Q.shape
    for i in xrange(y):
        for j in xrange(x):
            policy[(j,i)] = NEIGHBORS[np.argmax(Q[i][j])]
    return policy

The Q-Learning function is defined below.


In [13]:
def q_learning(world, costs, goal_state, goal_reward, actions, gamma, alpha):
    Q = initialize_q_matrix(world, actions, goal_state, goal_reward)
    N = np.zeros_like(Q)
    for i in xrange(episodes):
        s, epcopy, iter_count = initialize_state(world, goal_state, i + 1), np.copy(Q), 0
        while not s == goal_state:
            a, (sx,sy) = select_action(Q, N, s), s
            (nx,ny), reward = execute_action(world, costs, s, a) 
            Q[sy][sx][a] = (1-alpha)*(Q[sy][sx][a]) + (alpha*(reward+(gamma*np.max(Q[ny][nx]))))
            s, iter_count = (nx, ny), iter_count + 1
            if iter_count > iter_max: break
        if np.max(Q - epcopy) < omega: break
    return get_policy(Q)

We need a way to visualize our results so we shall use matplotlib once again and borrow Problem 1's definitions. The resulting plot will have two plots side-by-side, one for the map and one for the policy. Overlaying the policy on top of the map did not seem visually appealing.


In [14]:
def setup_map(ax, world):
    c = dict()
    c['.'] = ('.', 0.4)
    c['x'] = ('x', 0.7)
    c['^'] = ('^', 0.4)
    c['*'] = ('*', 0.4)
    c['~'] = ('d', 0.4)
    for y, _y in enumerate(world):
        for x, _x in enumerate(_y):
            params = c[world[y][x]]
            ax.plot(x, y, params[0], color='black', alpha=params[1])

This will draw our policy on the given axis.


In [15]:
def overlay_policy(ax, goal, policy, world):
    movement_map = ["^", ">", "v", "<"]
    for x, y in policy:
        if (x, y) == goal: symbol = "H"
        else: symbol = movement_map[NEIGHBORS.index(policy[(x,y)])] if world[y][x] in costs else "x"
        ax.plot(x, y, symbol, color='blue', alpha=0.8)

And now our main plotting function. We must remember to invert the y-axis after plotting the data.


In [16]:
def plot_policy(world, goal, policy):
    fig, (ax_map, ax_pol) = plt.subplots(1, 2)
    ax_map.set_axis_off()
    ax_pol.set_axis_off()
    ax_map.set_xlim((-1, len(world[0])))
    ax_map.set_ylim((-1, len(world)))
    ax_pol.set_xlim((-1, len(world[0])))
    ax_pol.set_ylim((-1, len(world)))
    setup_map(ax_map, world)
    overlay_policy(ax_pol, goal, policy, world)
    ax_map.invert_yaxis()
    ax_pol.invert_yaxis()
    plt.show()

Test on a smaller map first.


In [17]:
world = read_world("test_world.txt")
goal = (6,6)
%time policy = q_learning(world, costs, goal, 200.0, NEIGHBORS, 0.75, 0.25)
plot_policy(world, goal, policy)


Wall time: 4.2 s

I had to play with the discount factor and rewards for a bit to get the policy that I thought was good enough. Due to the randomness of the algorithm (selecting the initial state, selecting the actions, simulation of the transition model) no two runs are exactly alike.

And now we'll test it with our final world model from problem 1.


In [18]:
world = read_world("world.txt")
goal = (26, 26)
Ne = 5
epsilon = 0.15
Pe = 500
episodes = 1000000
iter_max = 1000000
%time policy = q_learning(world, costs, goal, 500.0, NEIGHBORS, 0.8, 0.25)
plot_policy(world, goal, policy)


Wall time: 12 s

I don't think this is the perfect policy, but I'm not sure there is one. I do notice that there may be loops if the agent starts on certain squares. I wonder if this has anything to do with the simulation of the transition model.

Summary

Reinforcement Learning is a fascinating AI algorithm that has many applications including Robotics, a topic I am very fond of. I can see how this algorithm can be used to get a robot to learn about its environment even if it knows relatively nothing about it.

This isn't a trivial algorithm to understand though. It seems that the outcome of the algorithm is highly dependent on the input and structure of the algorithms itself:

  • Selecting an initial state is important, especially in the early iterations of the algorithm. I opted to pick a square close to the goal and work my way outwards. But what constitutes as close? In a 2D world I still have a vast array of options to choose from that are considered close.
  • Selecting an action is equally important. I chose to use a combination of statistics and randomness. Initially the algorithm will try to pick an action inverse to the number of times it has already been picked, i.e. lesser picked actions are given priority. This gives the agent a chance to explore the area evenly. After that, we use a random exploration/exploitation function to determine the action. But if I'm already using a statistical function to explore, do I need the randomness afterwards? Initial testing told me yes.
  • Selecting values for alpha, gamma, omega, epsilon, Ne (statistical function for action-selection), and Pe (used to determine how close to the goal the initial state should be) is a very complex task. In fact, balancing out these variables seems like a CSP.

In [19]:
# my testing